package cn.doitedu.etl;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2022/12/10
 * @Tips: 学大数据，到多易教育
 * @Desc:   流量概况分钟级别轻度聚合etl任务
 **/
public class E02_EtlJob_TrafficMinuteAgg {
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建 kafka 连接器表： 维度打宽后的事件信息  source表
        tEnv.executeSql(
                " CREATE TABLE mall_events_commondim_kfksource(          "
                        + "     user_id           INT,                            "
                        + "     username          string,                         "
                        + "     session_id        string,                         "
                        + "     event_id          string,                         "
                        + "     event_time        bigint,                         "
                        + "     lat               double,                         "
                        + "     lng               double,                         "
                        + "     release_channel   string,                         "
                        + "     device_type       string,                         "
                        + "     properties        map<string,string>,             "
                        + "     register_phone    STRING,                         "
                        + "     user_status       INT,                            "
                        + "     register_time     TIMESTAMP(3),                   "
                        + "     register_gender   INT,                            "
                        + "     register_birthday DATE, register_province STRING, "
                        + "     register_city STRING, register_job STRING, register_source_type INT,   "
                        + "     gps_province   STRING, gps_city STRING, gps_region STRING,             "
                        + "     page_type   STRING, page_service STRING,         "
                        + "     rt as to_timestamp_ltz(event_time,3) ,           "
                        + "     watermark for  rt as rt - interval '0' seconds   "
                        + " ) WITH (                                             "
                        + "  'connector' = 'kafka',                              "
                        + "  'topic' = 'mall-evts-comdim-w',                     "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                        + "  'properties.group.id' = 'testGroup',                "
                        + "  'scan.startup.mode' = 'earliest-offset',            "
                        + "  'value.format'='json',                              "
                        + "  'value.json.fail-on-missing-field'='false',         "
                        + "  'value.fields-include' = 'EXCEPT_KEY')              ");

        //tEnv.executeSql("select user_id,session_id, from mall_events_commondim_kfksource").print();

        // 创建doris连接器表： 轻度聚合目标sink表
        tEnv.executeSql(
                " CREATE TABLE mall_tfcag_01_dorissink(         "
                        +" dt  DATE,                 "
                        +" time_60m STRING,          "
                        +" time_30m STRING,           "
                        +" time_10m STRING,          "
                        +" time_m STRING,            "
                        +" user_id INT,              "
                        +" is_newuser INT,           "
                        +" session_id STRING,        "
                        +" release_channel STRING,   "
                        +" device_type STRING,       "
                        +" gps_province STRING,      "
                        +" gps_city STRING,          "
                        +" gps_region STRING,        "
                        +" page_type STRING,         "
                        +" page_service STRING,      "
                        +" page_url STRING,          "
                        +" pv_amt BIGINT             "
                        + " ) WITH (                               "
                        + "    'connector' = 'doris',              "
                        + "    'fenodes' = 'doitedu:8030',         "
                        + "    'table.identifier' = 'dws.mall_tfcag_01',  "
                        + "    'username' = 'root',                "
                        + "    'password' = '',                    "
                        + "    'sink.label-prefix' = 'doris_label_dws"+System.currentTimeMillis()+"'"
                        + " )                                         ");


        // 选择所需字段，扩展：小时、30分，10分，分 维度字段， 按照最小粒度，聚合pv数
        // 这里麻烦的点在：各种时间的处理，尤其是按30分钟、10分钟取整的操作，用内置函数极为不便，所以干脆做了自定义函数
        tEnv.createTemporaryFunction("time_round",TimeRound.class);
        tEnv.executeSql(
                " INSERT INTO  mall_tfcag_01_dorissink                                     "
                +" WITH tmp AS (                                                                    "
                +" SELECT                                                                           "
                +"     user_id,                                                                     "
                +"     IF(DATE_FORMAT(register_time,'yyyy-MM-dd')<CURRENT_DATE,0,1) AS is_newuser,  "
                +"     session_id,                                                                  "
                +"     TO_DATE(DATE_FORMAT(TO_TIMESTAMP_LTZ(event_time, 3),'yyyy-MM-dd')) as dt,    "
                +"     release_channel,                                                             "
                +"     device_type,                                                                 "
                +"     gps_province,                                                                "
                +"     gps_city,                                                                    "
                +"     gps_region,                                                                  "
                +"     page_type,                                                                   "
                +"     page_service,                                                                "
                +"     properties['url'] AS page_url,                                               "
                +"     rt                                                                           "
                +" FROM mall_events_commondim_kfksource                                             "
                +" )                                                                                "
                +" SELECT                                                                           "
                +"     dt,                                                                          "
                +"     time_round(CAST(window_start AS STRING),60) AS time_60m,                     "
                +"     time_round(CAST(window_start AS STRING),30) AS time_30m,                     "
                +"     time_round(CAST(window_start AS STRING),10) AS time_10m,                     "
                +"     CAST(window_start AS STRING) AS time_m,                                      "
                +"     user_id,                                                                     "
                +"     is_newuser,                                                                  "
                +"     session_id,                                                                  "
                +"     release_channel,                                                             "
                +"     device_type,                                                                 "
                +"     gps_province,                                                                "
                +"     gps_city,                                                                    "
                +"     gps_region,                                                                  "
                +"     page_type,                                                                   "
                +"     page_service,                                                                "
                +"     page_url,                                                                    "
                +"     COUNT(1) AS pv_amt                                                           "
                +" FROM TABLE(                                                                      "
                +"     TUMBLE(TABLE tmp, DESCRIPTOR(rt), INTERVAL '60' seconds)                     "
                +" )                                                                                "
                +" GROUP BY                                                                         "
                +"     window_start,                                                                "
                +"     window_end,                                                                  "
                +"     dt,                                                                          "
                +"     user_id,                                                                     "
                +"     is_newuser,                                                                  "
                +"     session_id,                                                                  "
                +"     release_channel,                                                             "
                +"     device_type,                                                                 "
                +"     gps_province,                                                                "
                +"     gps_city,                                                                    "
                +"     gps_region,                                                                  "
                +"     page_type,                                                                  "
                +"     page_service,                                                                  "
                +"     page_url                                                                  "
        );
    }


    // 自定义函数，用户时间按小时，10分，30分 截断取整
    public static class TimeRound extends ScalarFunction {
        public String eval(String dateTime,int interval){
            // 2022-12-13 15:44:00
            String[] split = dateTime.split(":");
            int newMinute = Integer.parseInt(split[1])/interval;
            if(interval == 30){
                return split[0] + ":" + StringUtils.rightPad(newMinute*30 + "", 2, "0") + ":00.000";
            }else {
                return split[0] + ":" + StringUtils.rightPad(newMinute + "", 2, "0") + ":00.000";
            }
        }
    }
}
